/**
 * 
 */
package com.ws.framework.remoteservice.core.provider;

import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.shaded.com.google.common.collect.Maps;
import org.apache.zookeeper.CreateMode;

import com.google.common.collect.Lists;
import com.ws.framework.remoteservice.core.annotation.AnnotationParser;
import com.ws.framework.remoteservice.core.annotation.Implement;
import com.ws.framework.remoteservice.core.ex.AnnotationNotFoundException;
import com.ws.framework.remoteservice.core.ex.WRSIException;
import com.ws.framework.remoteservice.core.model.ProviderInfo;
import com.ws.framework.remoteservice.core.model.ServiceKey;
import com.ws.framework.remoteservice.core.protocal.Transport;
import com.ws.framework.remoteservice.core.register.CuratorClient;
import com.ws.framework.remoteservice.core.util.StringUtils;
import com.ws.framework.remoteservice.core.util.WrsiConstants;
import com.ws.framework.remoteservice.core.util.WsProtocolParser;

/**
 * @author WSH
 *
 */
public final class ServicePublisher {

	private static final Logger logger = Logger.getLogger(ServicePublisher.class.getName());

	private static List<ProviderInfo> providers = Lists.newCopyOnWriteArrayList();
	private static ConcurrentMap<ServiceKey, List<ProviderInfo>> localProviderInfos = Maps.newConcurrentMap();
	private static CuratorFramework curatorClient;

	static {
		curatorClient = CuratorClient.get();
		ProviderInfo info = Transport.instance.start();
		providers.add(info);
	}

	public static void publish(Object bean, Class<?> targetClass) {
		Implement implement = AnnotationParser.parseImplement(targetClass);
		if (null == implement) {
			throw new AnnotationNotFoundException(
					"Can not find Implements " + "or Implement annotation in " + targetClass.getName());
		}

		publish(bean, targetClass, implement);
	}

	public static void publish(Object serviceImpl, Class<?> serviceImplClass, Implement implement) {
		Class<?> contract = implement.contract();
		if (null == contract) {
			Class<?>[] interfaces = serviceImplClass.getInterfaces();
			if (interfaces.length == 1) {
				contract = interfaces[0];
			} else {
				throw new WRSIException("Attribute contract  must be " + "specified in @Implement annotation "
						+ "when more than one interface is " + "implemented by the class " + serviceImplClass);
			}
		}
		publish(serviceImpl, contract.getName(), implement.implCode());

	}

	public static void publish(Object serviceImpl, String contractkName, String implCode) {
		if (null == providers || providers.isEmpty()) {
			throw new WRSIException("Can't get provider's base info.");
		}
		export(contractkName, StringUtils.isEmpty(implCode) ? WrsiConstants.registry_for_default_provider : implCode,
				serviceImpl);

		addWatcher();
	}

	private static void export(String contractkName, String implCode, final Object serviceImpl) {
		final ServiceKey serviceKey = new ServiceKey(contractkName, implCode);
		try {
			logger.info("Begin register service..");
			for (ProviderInfo info : providers) {
				curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
						.inBackground(new BackgroundCallback() {
							@Override
							public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
								System.out.println(event);
								if (event.getType().equals(CuratorEventType.CREATE)) {
									ServiceExecutorImpl.instance.register(serviceKey, serviceImpl);
									localProviderInfos.putIfAbsent(serviceKey, providers);
								}
							}
						}).forPath(WsProtocolParser.providerEncode(contractkName, implCode, info));
			}
			logger.info("End register service..");
		} catch (Exception e) {
			logger.warning("Register service occur error. " + e.getMessage());
			throw new WRSIException("Export error. register the service occur exception. " + e.getMessage(), e);
		}
	}

	private static void addWatcher() {
		curatorClient.getConnectionStateListenable().addListener(new ConnectionStateListener() {
			@Override
			public void stateChanged(CuratorFramework client, ConnectionState newState) {
				if (newState == ConnectionState.RECONNECTED) {
					logger.log(Level.INFO, "zookeeper重新链接成功, 重新export");
					for (Iterator<Entry<ServiceKey, List<ProviderInfo>>> iterator = localProviderInfos.entrySet()
							.iterator(); iterator.hasNext();) {
						Entry<ServiceKey, List<ProviderInfo>> entry = iterator.next();
						for (ProviderInfo providerInfo : entry.getValue()) {
							exportWithRetry(entry.getKey(), providerInfo, 0);
						}
					}
				}
			}

		});
	}

	private static final int MAX_RETRY_TIMES = 30;

	private static void exportWithRetry(ServiceKey serviceKey, ProviderInfo providerInfo,
			int index) {
		try {
			curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground()
					.forPath(WsProtocolParser.providerEncode(serviceKey.getContractName(),
							serviceKey.getImplCode(), providerInfo));
		} catch (Exception e) {
			logger.warning("Retry export service occur error,  index :" + index + " e: " + e.getMessage());
			if (index++ < MAX_RETRY_TIMES) {
				try {
					TimeUnit.MILLISECONDS.sleep(5000);
				} catch (InterruptedException e1) {
					// ignore
				}
				exportWithRetry(serviceKey, providerInfo, index);
			} else {
				// TODO need to send msg to developer’s mobilephone, maybe zk‘s
				// server crashed
			}
		}
	}

	public static void destroy() {
		curatorClient.close();
		Transport.instance.destroy();
		providers.clear();
		localProviderInfos.clear();
	}

}
